package org.conan.myhadoop.hdfs;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.conan.myhadoop.common.CommonConstant;

public class HdfsUtil {

	private static final String HDFS = CommonConstant.HadoopConstand.HADOOP_HDFS;

	private static Configuration jobConf = null;

	static {
		jobConf = config();
	}

	public HdfsUtil() {
		this(HDFS, jobConf);
	}

	public HdfsUtil(String hdfs, Configuration conf) {
		this.hdfsPath = hdfs;
		this.conf = conf;
	}

	private String hdfsPath;
	private Configuration conf;

	public static void main(String[] args) throws IOException {
		HdfsUtil hdfs = new HdfsUtil();
		hdfs.upload("C:/Users/A/Desktop/i-wordcount.txt", "/input/new");
		// hdfs.ls("/tmp/new");
//		hdfs.rename("/user/hdfs/pagerank/tmp3", "/user/hdfs/pagerank/tmp4");
	}

	public static Configuration config() {
		JobConf conf = new JobConf(HdfsUtil.class);
		conf.setJobName("HdfsUtil");
		conf.addResource("classpath:/hadoop/core-site.xml");
		conf.addResource("classpath:/hadoop/hdfs-site.xml");
		conf.addResource("classpath:/hadoop/mapred-site.xml");
		conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());  
//		conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() );
		return conf;
	}

	public void mkdirs(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		if (!fs.exists(path)) {
			fs.mkdirs(path);
			System.out.println("Create: " + folder);
		}
		fs.close();
	}

	public void rmr(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.deleteOnExit(path);
		System.out.println("Delete: " + folder);
		fs.close();
	}

	public void rename(String src, String dst) throws IOException {
		Path name1 = new Path(src);
		Path name2 = new Path(dst);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.rename(name1, name2);
		System.out.println("Rename: from " + src + " to " + dst);
		fs.close();
	}

	public void ls(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		FileStatus[] list = fs.listStatus(path);
		System.out.println("ls: " + folder);
		System.out.println("==========================================================");
		for (FileStatus f : list) {
			System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
		}
		System.out.println("==========================================================");
		fs.close();
	}

	public void createFile(String file, String content) throws IOException {
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		byte[] buff = content.getBytes();
		FSDataOutputStream os = null;
		try {
			os = fs.create(new Path(file));
			os.write(buff, 0, buff.length);
			System.out.println("Create: " + file);
		} finally {
			if (os != null)
				os.close();
		}
		fs.close();
	}

	public void upload(String local, String remote) throws IOException {
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.copyFromLocalFile(new Path(local), new Path(remote));
		System.out.println("copy from: " + local + " to " + remote);
		fs.close();
	}

	public void download(String remote, String local) throws IOException {
		Path path = new Path(remote);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.copyToLocalFile(path, new Path(local));
		System.out.println("download: from" + remote + " to " + local);
		fs.close();
	}

	public String cat(String remoteFile) throws IOException {
		Path path = new Path(remoteFile);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		FSDataInputStream fsdis = null;
		System.out.println("cat: " + remoteFile);

		OutputStream baos = new ByteArrayOutputStream();
		String str = null;

		try {
			fsdis = fs.open(path);
			IOUtils.copyBytes(fsdis, baos, 4096, false);
			str = baos.toString();
		} finally {
			IOUtils.closeStream(fsdis);
			fs.close();
		}
		System.out.println(str);
		return str;
	}

	public void location() throws IOException {
		// String folder = hdfsPath + "create/";
		// String file = "t2.txt";
		// FileSystem fs = FileSystem.get(URI.create(hdfsPath), new
		// Configuration());
		// FileStatus f = fs.getFileStatus(new Path(folder + file));
		// BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
		//
		// System.out.println("File Location: " + folder + file);
		// for (BlockLocation bl : list) {
		// String[] hosts = bl.getHosts();
		// for (String host : hosts) {
		// System.out.println("host:" + host);
		// }
		// }
		// fs.close();
	}

}
